package com.atguigu.day10;

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class Flink06_SQL_UDF_UDTAF {
    public static void main(String[] args) {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.从端口读取数据并转为WaterSensor
        SingleOutputStreamOperator<WaterSensor> waterSensorStream = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] split = value.split(",");
                        return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                    }
                });

        //3.获取表的执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //4.将流转为表
        Table table = tableEnv.fromDataStream(waterSensorStream);

        //5.不注册直接使用
      /*  table.groupBy($("id"))
//                .flatAggregate(call(MyUDTAF.class,$("vc")))
                .flatAggregate(call(MyUDTAF.class,$("vc")).as("value", "rank"))
//                .select($("id"),$("f0").as("vc"),$("f1").as("Rank"))
                .select($("id"),$("value"),$("rank"))
                .execute()
                .print();*/
        //先注册再使用
        tableEnv.createTemporarySystemFunction("vcTop", MyUDTAF.class);

        //TableAPI
        table.groupBy($("id"))

                .flatAggregate(call("vcTop",$("vc")).as("value", "rank"))
                .select($("id"),$("value"),$("rank"))
                .execute()
                .print();

    }

    public static class MyTopAcc{
        public Integer fitstVc;
        public Integer secondVc;
    }

    //定义一个表聚合函数（UDTAF），数据多进多出 ，最高的两个VC
    public static class MyUDTAF extends TableAggregateFunction<Tuple2<Integer,String>,MyTopAcc>{

        @Override
        public MyTopAcc createAccumulator() {
            MyTopAcc myTopAcc = new MyTopAcc();
            myTopAcc.fitstVc = Integer.MIN_VALUE;
            myTopAcc.secondVc = Integer.MIN_VALUE;
            return myTopAcc;
        }

        public void accumulate(MyTopAcc acc,Integer value){
            //1.数据进来之后先和第一名做判断
            if (value>acc.fitstVc){
                //2.重新给第二名赋值
                acc.secondVc = acc.fitstVc;
                //给第一名赋值
                acc.fitstVc = value;
            }else if (value>acc.secondVc){
                acc.secondVc = value;
            }
        }

        public void emitValue(MyTopAcc acc, Collector<Tuple2<Integer,String>> out){
            if (acc.fitstVc!=Integer.MIN_VALUE){
                out.collect(Tuple2.of(acc.fitstVc,"1"));
            }

            if (acc.secondVc!=Integer.MIN_VALUE){
                out.collect(Tuple2.of(acc.secondVc,"2"));
            }
        }

    }

}
